package com.bw.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

/**
 * @ClassName: FlinkKafka
 * @Author: huzx
 * @Date: 2025-01-03 21:23
 * @Version: 1.0
 **/
public class FlinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("node101:9092,node102:9092,node103:9092")
                .setTopics("bw-data")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        ds.print();

        env.execute("Window WordCount");
    }
}

/*
/opt/module/kafka/bin/kafka-console-producer.sh  \
--broker-list node101:9092,node102:9092,node103:9092  \
--topic bw-data

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#kafka-source
 */